/*-------------------------------------------------------------------------
 *
 * metadata_utility.h
 *	  Type and function declarations used for reading and modifying
 *    coordinator node's metadata.
 *
 * Copyright (c) Citus Data, Inc.
 *
 * $Id$
 *
 *-------------------------------------------------------------------------
 */

#ifndef METADATA_UTILITY_H
#define METADATA_UTILITY_H

#include "postgres.h"

#include "access/heapam.h"
#include "access/htup.h"
#include "access/tupdesc.h"
#include "catalog/indexing.h"
#include "catalog/objectaddress.h"
#include "utils/acl.h"
#include "utils/relcache.h"

#include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/errormessage.h"
#include "distributed/relay_utility.h"
#include "distributed/worker_manager.h"

/* total number of hash tokens (2^32) */
#define HASH_TOKEN_COUNT INT64CONST(4294967296)
#define SELECT_TRUE_QUERY "SELECT TRUE FROM %s LIMIT 1"
#define PG_TABLE_SIZE_FUNCTION "pg_table_size(%s)"
#define PG_RELATION_SIZE_FUNCTION "pg_relation_size(%s)"
#define PG_TOTAL_RELATION_SIZE_FUNCTION "pg_total_relation_size(%s)"
#define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)"
#define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)"
#define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \
    "worker_partitioned_relation_total_size(%s)"

#define SHARD_SIZES_COLUMN_COUNT (2)

/* the default namespace for spq plugin's tool functions. */
extern const char* SPQ_NS_NAME;

/* In-memory representation of a typed tuple in pg_dist_shard. */
typedef struct ShardInterval {
    CitusNode type;
    Oid relationId;
    char storageType;
    Oid valueTypeId;  /* min/max value datum's typeId */
    int valueTypeLen; /* min/max value datum's typelen */
    bool valueByVal;  /* min/max value datum's byval */
    bool minValueExists;
    bool maxValueExists;
    Datum minValue; /* a shard's typed min value datum */
    Datum maxValue; /* a shard's typed max value datum */
    uint64 shardId;
    int shardIndex;
} ShardInterval;

/* In-memory representation of a tuple in pg_dist_placement. */
typedef struct GroupShardPlacement {
    CitusNode type;
    uint64 placementId; /* sequence that implies this placement creation order */
    uint64 shardId;
    uint64 shardLength;
    int32 groupId;
} GroupShardPlacement;

/* A GroupShardPlacement which has had some extra data resolved */
typedef struct ShardPlacement {
    /*
     * careful, the rest of the code assumes this exactly matches GroupShardPlacement
     */
    CitusNode type;
    uint64 placementId;
    uint64 shardId;
    uint64 shardLength;
    int32 groupId;

    /* the rest of the fields aren't from pg_dist_placement */
    char* nodeName;
    uint32 nodePort;
    uint32 nodeId;
    char partitionMethod;
    uint32 colocationGroupId;
    uint32 representativeValue;
} ShardPlacement;

typedef enum CascadeToColocatedOption {
    CASCADE_TO_COLOCATED_UNSPECIFIED,
    CASCADE_TO_COLOCATED_YES,
    CASCADE_TO_COLOCATED_NO,
    CASCADE_TO_COLOCATED_NO_ALREADY_CASCADED
} CascadeToColocatedOption;

/*
 * TableConversionParameters are the parameters that are given to
 * table conversion UDFs: undistribute_table, alter_distributed_table,
 * alter_table_set_access_method.
 *
 * When passing a TableConversionParameters object to one of the table
 * conversion functions some of the parameters needs to be set:
 * UndistributeTable: relationId
 * AlterDistributedTable: relationId, distributionColumn, shardCountIsNull,
 * shardCount, colocateWith, cascadeToColocated
 * AlterTableSetAccessMethod: relationId, accessMethod
 *
 * conversionType parameter will be automatically set by the function.
 *
 * TableConversionState objects can be created using TableConversionParameters
 * objects with CreateTableConversion function.
 */
typedef struct TableConversionParameters {
    /*
     * Determines type of conversion: UNDISTRIBUTE_TABLE,
     * ALTER_DISTRIBUTED_TABLE, ALTER_TABLE_SET_ACCESS_METHOD.
     */
    char conversionType;

    /* Oid of the table to do conversion on */
    Oid relationId;

    /*
     * Options to do conversions on the table
     * distributionColumn is the name of the new distribution column,
     * shardCountIsNull is if the shardCount variable is not given
     * shardCount is the new shard count,
     * colocateWith is the name of the table to colocate with, 'none', or
     * 'default'
     * accessMethod is the name of the new accessMethod for the table
     */
    char* distributionColumn;
    bool shardCountIsNull;
    int shardCount;
    char* colocateWith;
    char* accessMethod;

    /*
     * cascadeToColocated determines whether the shardCount and
     * colocateWith will be cascaded to the currently colocated tables
     */
    CascadeToColocatedOption cascadeToColocated;

    /*
     * cascadeViaForeignKeys determines if the conversion operation
     * will be cascaded to the graph connected with foreign keys
     * to the table
     */
    bool cascadeViaForeignKeys;

    /*
     * suppressNoticeMessages determines if we want to suppress NOTICE
     * messages that we explicitly issue
     */
    bool suppressNoticeMessages;
} TableConversionParameters;

typedef struct TableConversionReturn {
    /*
     * commands to create foreign keys for the table
     *
     * When the table conversion is cascaded we can recreate
     * some of the foreign keys of the cascaded tables. So with this
     * list we can return it to the initial conversion operation so
     * foreign keys can be created after every colocated table is
     * converted.
     */
    List* foreignKeyCommands;
} TableConversionReturn;

/*
 * Size query types for PG and Citus
 * For difference details, please see:
 * https://www.postgresql.org/docs/13/functions-admin.html#FUNCTIONS-ADMIN-DBSIZE
 */
typedef enum SizeQueryType {
    RELATION_SIZE,       /* pg_relation_size() */
    TOTAL_RELATION_SIZE, /* pg_total_relation_size() */
    TABLE_SIZE           /* pg_table_size() */
} SizeQueryType;

typedef enum {
    COLOCATE_WITH_TABLE_LIKE_OPT,
    COLOCATE_WITH_COLOCATION_ID
} ColocationParamType;

/*
 * Param used to specify the colocation target of a distributed table. It can
 * be either a table name or a colocation id.
 *
 * When colocationParamType is COLOCATE_WITH_COLOCATION_ID, colocationId is
 * expected to be a valid colocation id. When colocationParamType is set to
 * COLOCATE_WITH_TABLE_LIKE_OPT, colocateWithTableName is expected to
 * be a valid table name, "default" or "none".
 *
 * Among the functions used to create a Citus table, right now only
 * CreateSingleShardTable() accepts a ColocationParam.
 */
typedef struct {
    union {
        char* colocateWithTableName;
        uint32 colocationId;
    };

    ColocationParamType colocationParamType;
} ColocationParam;

typedef enum BackgroundJobStatus {
    BACKGROUND_JOB_STATUS_SCHEDULED,
    BACKGROUND_JOB_STATUS_RUNNING,
    BACKGROUND_JOB_STATUS_FINISHED,
    BACKGROUND_JOB_STATUS_CANCELLING,
    BACKGROUND_JOB_STATUS_CANCELLED,
    BACKGROUND_JOB_STATUS_FAILING,
    BACKGROUND_JOB_STATUS_FAILED
} BackgroundJobStatus;

typedef struct BackgroundJob {
    int64 jobid;
    BackgroundJobStatus state;
    char* jobType;
    char* description;
    TimestampTz* started_at;
    TimestampTz* finished_at;

    /* extra space to store values for nullable value types above */
    struct {
        TimestampTz started_at;
        TimestampTz finished_at;
    } __nullable_storage;
} BackgroundJob;

typedef enum BackgroundTaskStatus {
    BACKGROUND_TASK_STATUS_BLOCKED,
    BACKGROUND_TASK_STATUS_RUNNABLE,
    BACKGROUND_TASK_STATUS_RUNNING,
    BACKGROUND_TASK_STATUS_CANCELLING,
    BACKGROUND_TASK_STATUS_DONE,
    BACKGROUND_TASK_STATUS_ERROR,
    BACKGROUND_TASK_STATUS_UNSCHEDULED,
    BACKGROUND_TASK_STATUS_CANCELLED
} BackgroundTaskStatus;

typedef struct BackgroundTask {
    int64 jobid;
    int64 taskid;
    Oid owner;
    int32* pid;
    BackgroundTaskStatus status;
    char* command;
    int32* retry_count;
    TimestampTz* not_before;
    char* message;
    List* nodesInvolved;

    /* extra space to store values for nullable value types above */
    struct {
        int32 pid;
        int32 retry_count;
        TimestampTz not_before;
    } __nullable_storage;
} BackgroundTask;

#define SET_NULLABLE_FIELD(ptr, field, value)  \
    (ptr)->__nullable_storage.field = (value); \
    (ptr)->field = &((ptr)->__nullable_storage.field)

#define UNSET_NULLABLE_FIELD(ptr, field) \
    (ptr)->field = NULL;                 \
    memset_struct_0((ptr)->__nullable_storage.field)

/* Size functions */
extern "C" Datum spq_table_size(PG_FUNCTION_ARGS);
extern "C" Datum spq_total_relation_size(PG_FUNCTION_ARGS);
extern "C" Datum spq_relation_size(PG_FUNCTION_ARGS);

extern char* DISABLE_COLOCATION_NAME;

/* Function declarations to read shard and shard placement data */
extern uint32 TableShardReplicationFactor(Oid relationId);
extern List* LoadShardIntervalList(Oid relationId);
extern List* LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
extern ShardInterval* LoadShardIntervalWithLongestShardName(Oid relationId);
extern int ShardIntervalCount(Oid relationId);
extern List* LoadShardList(Oid relationId);
extern ShardInterval* CopyShardInterval(ShardInterval* srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId);
extern bool IsActiveShardPlacement(ShardPlacement* ShardPlacement);
extern bool IsRemoteShardPlacement(ShardPlacement* shardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement* placement, WorkerNode* workerNode);
extern List* FilterShardPlacementList(List* shardPlacementList,
                                      bool (*filter)(ShardPlacement*));
extern List* FilterActiveShardPlacementListByNode(List* shardPlacementList,
                                                  WorkerNode* workerNode);
extern List* ActiveShardPlacementListOnGroup(uint64 shardId, int32 groupId);
extern List* ActiveShardPlacementList(uint64 shardId);
extern List* ShardPlacementListSortedByWorker(uint64 shardId);
extern ShardPlacement* ActiveShardPlacement(uint64 shardId, bool missingOk);
extern WorkerNode* ActiveShardPlacementWorkerNode(uint64 shardId);
extern List* BuildShardPlacementList(int64 shardId);
extern List* AllShardPlacementsOnNodeGroup(int32 groupId);
extern List* GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId);
extern void LookupTaskPlacementHostAndPort(ShardPlacement* taskPlacement, char** nodeName,
                                           int* nodePort);
extern bool IsDummyPlacement(ShardPlacement* taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(
    List* shardIntervalList, SizeQueryType sizeQueryType,
    bool optimizePartitionCalculations);
extern List* RemoveCoordinatorPlacementIfNotSingleNode(List* placementList);

/* Function declarations to modify shard and shard placement data */
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
                           text* shardMinValue, text* shardMaxValue);
extern void DeleteShardRow(uint64 shardId);
extern ShardPlacement* InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
                                                       uint64 shardLength, int32 groupId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
                                      uint64 shardLength, int32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
                                      Var* distributionColumn, uint32 colocationId,
                                      char replicationModel, bool autoConverted);
extern void UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted);
extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMethod,
                                             Var* distributionColumn, int colocationId);
extern void UpdateDistributionColumn(Oid relationId, char distributionMethod,
                                     Var* distributionColumn, int colocationId);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel,
                                                uint32 colocationId, bool autoConverted);
extern void UpdateNoneDistTableMetadata(Oid relationId, char replicationModel,
                                        uint32 colocationId, bool autoConverted);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRowGlobally(uint64 placementId);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam);
extern void CreateDistributedTable(Oid relationId, char* distributionColumnName,
                                   char distributionMethod, int shardCount,
                                   bool shardCountIsStrict, char* colocateWithTableName);
extern void CreateReferenceTable(Oid relationId);
extern void CreateTruncateTrigger(Oid relationId);
extern uint64 CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId);
extern TableConversionReturn* UndistributeTable(TableConversionParameters* params);
extern void UndistributeTables(List* relationIdList);

extern void EnsureAllObjectDependenciesExistOnAllNodes(const List* targets);
extern DeferredErrorMessage* DeferErrorIfCircularDependencyExists(
    const ObjectAddress* objectAddress);
extern List* GetDistributableDependenciesForObject(const ObjectAddress* target);
extern List* GetAllDependencyCreateDDLCommands(const List* dependencies);
extern bool ShouldPropagate(void);
extern bool ShouldPropagateDDL(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateAnyObject(List* addresses);

/* Remaining metadata utility functions  */
extern Oid TableOwnerOid(Oid relationId);
extern char* TableOwner(Oid relationId);
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
extern void EnsureTableOwner(Oid relationId);
extern void EnsureHashDistributedTable(Oid relationId);
extern void EnsureHashOrSingleShardDistributedTable(Oid relationId);
extern void EnsureFunctionOwner(Oid functionId);
extern void EnsureSchemaOwner(Oid schemaId);
extern void EnsureSuperUser(void);
extern void ErrorIfTableIsACatalogTable(Relation relation);
extern void EnsureTableNotDistributed(Oid relationId);
extern void EnsureRelationExists(Oid relationId);
extern bool RegularTable(Oid relationId);
extern bool TableEmpty(Oid tableId);
extern bool IsForeignTable(Oid relationId);
extern bool ForeignTableDropsTableNameOption(List* optionList);
extern bool ServerUsesPostgresFdw(Oid serverId);
extern char* ConstructQualifiedShardName(ShardInterval* shardInterval);
extern uint64 GetFirstShardId(Oid relationId);
extern Datum StringToDatum(char* inputString, Oid dataType);
extern char* DatumToString(Datum datum, Oid dataType);
extern int CompareShardPlacementsByWorker(const void* leftElement,
                                          const void* rightElement);
extern int CompareShardPlacementsByGroupId(const void* leftElement,
                                           const void* rightElement);
extern ShardInterval* DeformedDistShardTupleToShardInterval(Datum* datumArray,
                                                            bool* isNullArray,
                                                            Oid intervalTypeId,
                                                            int32 intervalTypeMod);
extern void GetIntervalTypeInfo(char partitionMethod, Var* partitionColumn,
                                Oid* intervalTypeId, int32* intervalTypeMod);
extern List* SendShardStatisticsQueriesInParallel(List* citusTableIds,
                                                  bool useDistributedTransaction);
extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection* connection,
                                               uint64* availableBytes,
                                               uint64* totalBytes);
extern void ExecuteQueryViaSPI(char* query, int SPIOK);
extern void ExecuteAndLogQueryViaSPI(char* query, int SPIOK, int logLevel);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId,
                                        Oid ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void EnsureRelationHasCompatibleSequenceTypes(Oid relationId);
extern bool HasRunnableBackgroundTask(void);
extern bool HasNonTerminalJobOfType(const char* jobType, int64* jobIdOut);
extern int64 CreateBackgroundJob(const char* jobType, const char* description);
extern BackgroundTask* ScheduleBackgroundTask(int64 jobId, Oid owner, char* command,
                                              int dependingTaskCount,
                                              int64 dependingTaskIds[],
                                              int nodesInvolvedCount,
                                              int32 nodesInvolved[]);
extern BackgroundTask* GetRunnableBackgroundTask(void);
extern void ResetRunningBackgroundTasks(void);
extern BackgroundJob* GetBackgroundJobByJobId(int64 jobId);
extern BackgroundTask* GetBackgroundTaskByTaskId(int64 taskId);
extern void UpdateBackgroundJob(int64 jobId);
extern void UpdateBackgroundTask(BackgroundTask* task);
extern void UpdateJobStatus(int64 taskId, const pid_t* pid, BackgroundTaskStatus status,
                            const int32* retry_count, char* message);
extern bool UpdateJobError(BackgroundTask* job, ErrorData* edata);
extern List* CancelTasksForJob(int64 jobid);
extern void UnscheduleDependentTasks(BackgroundTask* task);
extern void UnblockDependingBackgroundTasks(BackgroundTask* task);
extern BackgroundJobStatus BackgroundJobStatusByOid(Oid enumOid);
extern BackgroundTaskStatus BackgroundTaskStatusByOid(Oid enumOid);
extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
#endif /* METADATA_UTILITY_H */
